前一章中，讨论了C++中的并发和多线程的基础知识。并发代码设计中最大的挑战是正确处理数据竞争。线程同步和调度也不是一个容易掌握的知识点。我们可以在任何怀疑有数据竞争的地方使用同步原语，比如：互斥锁，但这并不是最佳方式。 \par
设计并发代码的更好方法是不惜一切代价不使用锁。这不仅会提高应用程序的性能，而且会使它比以前更安全。说起来容易做起来难——无锁编程是本章的一个具有挑战性的主题。特别是，我们将进一步深入设计无锁算法和数据结构的基础知识。这是许多优秀开发者不断研究的一个课题。我们将接触无锁编程的基础知识，这将使您了解如何以一种有效的方式构造代码。阅读完本章后，读者能够更好地理解数据竞争的问题，并获得设计并发算法和数据结构所需的基本知识。这些可能对构建容错系统的设计也有帮助。 \par
本章中，我们将了解以下内容: \par

\begin{itemize}
	\item 理解数据竞争和基于锁的解决方案
	\item 在C++中使用原子操作
	\item 设计无锁的数据结构
\end{itemize}

\noindent\textbf{}\ \par
\textbf{编译器要求} \ \par
g++编译器需要添加编译选项 \texttt{-std=c++2a} 来编译本章的代码。可以从这里获取本章的源码文件：https:/​/github.​com/PacktPublishing/Expert-CPP \par

\noindent\textbf{}\ \par
\textbf{近距离观察数据竞争} \ \par
如前所述，数据竞争是开发者需要不惜一切代价试图避免的情况。前一章中，我们讨论了死锁以及避免它的方法。我们在前一章中使用的最后一个例子，是创建线程安全的单例模式。假设使用一个类来创建数据库连接(一个经典的例子)。 \par
下面是数据库连接模式的一个简单实现。每次访问数据库时，保持一个单独的连接并不是一个好做法。所以，我们重用现有的连接从程序不同的部分查询数据库: \par

\begin{lstlisting}[caption={}]
namespace Db {
	class ConnectionManager
	{
	public:
		static std::shared_ptr<ConnectionManager> get_instance()
		{
			if (instance_ == nullptr) {
				instance_.reset(new ConnectionManager());
			}
			return instance_;
		}
		// Database connection related code omitted
	private:
		static std::shared_ptr<ConnectionManager> instance_{nullptr};
	};
}
\end{lstlisting}

让我们更详细地讨论这个示例。前一章中，我们加入了锁来保护get\underline{ }instance()函数不受数据竞争的影响。让我们详细说明一下这样做的原因。为了简化这个例子，下面是我们感兴趣部分的四行伪码: \par

\begin{lstlisting}[caption={}]
get_instance()
	if (_instance == nullptr)
		instance_.reset(new)
	return instance_;
\end{lstlisting}

现在，假设我们运行一个访问get\underline{ }instance()函数的线程。我们将其命名为Thread A，它执行的第一行是条件语句，如下所示: \par

\begin{lstlisting}[caption={}]
get_instance()
	if (_instance == nullptr) <--- Thread A
		instance_.reset(new)
	return instance_;
\end{lstlisting}

它将逐行执行指令。更让我们感兴趣的是第二个线程(标记为Thread B)，它开始执行与Thread A并发的函数。函数的并发执行过程中可能会出现以下情况: \par

\begin{lstlisting}[caption={}]
get_instance()
	if (_instance == nullptr) <--- Thread B (checking)
		instance_.reset(new)  <--- Thread A (already checked)
	return instance_;
\end{lstlisting}

Thread B在比较instance\underline{ }和nullptr时为真。Thread A传递了相同的检查并将instance\underline{ }设置为一个新对象。虽然从Thread A的角度来看，一切看起来都很好，但它只是传递了条件检查，重置实例，并将转到下一行返回instance\underline{ }。然而，Thread B在instance\underline{ }的值改变之前比较了它。因此，Thread B也会继续设置instance\underline{ }的值: \par

\begin{lstlisting}[caption={}]
get_instance()
	if (_instance == nullptr)
		instance_.reset(new) <--- Thread B (already checked)
	return instance_; 		 <--- Thread A (returns)
\end{lstlisting}

前面的问题是，Thread B在instance\underline{ }设置好之后，又重新设置了它。它由几个指令组成，每个指令都由一个线程按顺序执行。为了使两个线程不相互干扰，操作不应该包含一个以上的指令。 \par
我们关心数据竞争的原因是前面代码块中的间隙，行与行之间的间隙允许线程相互干扰。因为这个解决方案可能不是正确的，所以使用同步原语(比如互斥锁)的解决方案时，应该想象所有的空隙。下面的修改使用了互斥锁和双重检查锁的模式: \par

\begin{lstlisting}[caption={}]
static std::shared_ptr<ConnectionManager> get_instance()
{
	if (instance_ == nullptr) {
		// mutex_ is declared in the private section
		std::lock_guard lg{mutex_};
		if (instance_ == nullptr) { // double-checking
			instance_.reset(new ConnectionManager());
		}
	}
	return instance_;
}
\end{lstlisting}

下面是两个线程试图访问instance\underline{ }对象时会发生的事情: \par

\begin{lstlisting}[caption={}]
get_instance()
	if (instance_ == nullptr) 	<--- Thread B
		lock mutex 				<--- Thread A (locks the mutex)
		if (instance_ == nullptr)
			instance_.reset(new)
		unlock mutex
	return instance_
\end{lstlisting}

现在，即使两个线程都通过了第一次检查，其中一个线程也会锁定互斥锁。当一个线程试图锁定互斥锁时，另一个线程将重置实例。为了确保它没有被设置，我们使用了第二个检查(这就是为什么它被称为双重检查): \par

\begin{lstlisting}[caption={}]
get_instance()
	if (instance_ == nullptr)
		lock mutex 					<--- Thread B (tries to lock, waits)
		if (instance_ == nullptr) 	<--- Thread A (double check)
			instance_.reset(new)
		unlock mutex
	return instance_
\end{lstlisting}

当Thread A完成了instance\underline{ }的设置后，就会解锁互斥锁，这样Thread B就可以继续锁定并重置instance\underline{ }: \par

\begin{lstlisting}[caption={}]
get_instance()
	if (instance_ == nullptr)
		lock mutex 					<--- Thread B (finally locks the mutex)
		if (instance_ == nullptr) 	<--- Thread B (check is not passed)
			instance_.reset(new)
		unlock mutex 				<--- Thread A (unlocked the mutex)
	return instance_ 				<--- Thread A (returns)
\end{lstlisting}

作为经验法则，应该始终在代码的行与行之间寻找线索。两个语句之间总是有一个间隙，这个间隙会使两个或多个线程相互干扰。下一节将详细讨论数字递增的示例。 \par

\noindent\textbf{}\ \par
\textbf{添加同步机制} \ \par
几乎每一本涉及线程同步的书，都会使用了一个增加数字作为数据竞争示例。本书也不例外，示例如下: \par

\begin{lstlisting}[caption={}]
#include <thread>

int counter = 0;

void foo()
{
	counter++;
}

int main()
{
	std::jthread A{foo};
	std::jthread B{foo};
	std::jthread C{[]{foo();}};
	std::jthread D{
		[]{
			for (int ix = 0; ix < 10; ++ix) { foo(); }
		}
	};
}
\end{lstlisting}

我们添加了两个线程，使示例更加复杂。前面的代码只是使用四个不同的线程递增计数器变量。乍一看，在任何时间点，只有一个线程增量计数器。然而，正如我们在前一节中提到的，我们应该注意并寻找代码中的漏洞。foo()函数似乎缺少一个。自增操作符的行为方式如下(伪代码): \par

\begin{lstlisting}[caption={}]
auto res = counter;
counter = counter + 1;
return res;
\end{lstlisting}

现在，我们发现了本不应该存在的问题。在任何时候，只有一个线程执行前面三条指令中的一条。也就是说，可能出现如下情况: \par

\begin{lstlisting}[caption={}]
auto res = counter; <--- thread A
counter = counter + 1; <--- thread B
return res; <--- thread C
\end{lstlisting}

例如：thread B可能修改counter的值，而thread A读取了修改之前的值。这意味着thread A将在thread B已经完成该操作时，给counter分配一个新的增量值。非常混乱，我们的大脑迟早会因试图理解操作的顺序而爆炸。作为一个经典示例，我们将使用线程锁的机制来解决它。这里有一个解决方案: \par

\begin{lstlisting}[caption={}]
#include <thread>
#include <mutex>

int counter = 0;
std::mutex m;

void foo()
{
	std::lock_guard g{m};
	counter++;
}

int main()
{
	// code omitted for brevity
}
\end{lstlisting}

无论哪个线程先到达lock\underline{ }guard，都会首先锁定mutex，如下所示: \par

\begin{lstlisting}[caption={}]
lock mutex; <--- thread A, B, D wait for the locked mutex
auto res = counter; <--- thread C has locked the mutex
counter = counter + 1;
unlock mutex; <--- A, B, D are blocked until C reaches here
return res;
\end{lstlisting}

使用锁的问题是性能。理论上，我们使用线程来加速程序的执行，更确切地说是数据处理。在大数据集合的情况下，使用多线程可能会大大提高程序的性能。但是，在多线程环境中，我们首先要注意并发访问，因为使用多个线程访问集合可能会导致数据的损坏。让我们来看看线程安全的堆栈实现。 \par

\noindent\textbf{}\ \par
\textbf{实现一个线程安全的栈} \ \par
回顾第6章中的堆栈数据结构适配器，我们将使用锁来实现线程安全的堆栈版本。栈有两个基本操作，push和pop。它们都修改容器的状态。堆栈本身不是容器，它是一种适配器，其包装了一个容器，并提供了一个适合访问的接口。合并线程安全，我们把std::stack封装在一个新类中。除了构造函数和销毁函数外，std::stack还提供了以下函数: \par

\begin{itemize}
	\item top() : 访问堆栈的顶部元素
	\item empty() : 如果堆栈为空，则返回true
	\item size() : 返回堆栈的当前大小
	\item push() : 在堆栈(顶部)中插入一个新项
	\item emplace() : 在堆栈的顶部构造一个元素
	\item pop() : 删除堆栈的顶部元素
	\item swap() : 将内容与另一个栈交换
\end{itemize}

我们将保持它的简单性，并将重点放在线程安全上。这里主要关注的是修改底层数据结构的函数。我们感兴趣的是push()和pop()函数。如果多个线程相互干扰，这些函数可能会破坏数据结构。因此，下面的声明是表示线程安全堆栈的类: \par

\begin{lstlisting}[caption={}]
template <typename T>
class safe_stack
{
public:
	safe_stack();
	safe_stack(const safe_stack& other);
	void push(T value); // we will std::move it instead of copy-referencing
	void pop();
	T& top();
	bool empty() const;
private:
	std::stack<T> wrappee_;
	mutable std::mutex mutex_;
}
\end{lstlisting}

注意，我们将mutex\underline{ }声明为可变的，因为我们将它锁定在const函数empty()中。与删除empty()的常量相比，这可以说是一个更好的设计选择。然而，对数据成员使用“可变”表示我们做出了糟糕的设计选择。无论如何，safe\underline{ }stack的客户端代码不会太关心实现的内部细节，甚至不知道堆栈使用互斥锁来同步并发访问。 \par
现在让我们看看它成员函数的实现和一个简短的描述。让我们从复制构造函数开始: \par

\begin{lstlisting}[caption={}]
safe_stack::safe_stack(const safe_stack& other)
{
	std::lock_guard<std::mutex> lock(other.mutex_);
	wrappee_ = other.wrappee_;
}
\end{lstlisting}

注意，我们锁定了另一个堆栈的互斥锁。尽管看起来不公平，但我们需要确保在复制另一个栈的基础数据时，数据不会被修改。 \par
接下来，让我们看看push()函数的实现。我们锁定互斥锁并将数据推入底层堆栈: \par

\begin{lstlisting}[caption={}]
void safe_stack::push(T value)
{
	std::lock_guard<std::mutex> lock(mutex_);
	// note how we std::move the value
	wrappee_.push(std::move(value));
}
\end{lstlisting}

几乎所有的函数都以相同的方式合并线程同步:锁定互斥锁、执行任务和解锁互斥锁。这确保了在任何时候只有一个线程在访问数据。也就是说，为了保护数据不受竞争条件的影响，我们必须确保函数不变量没有被破坏。 \par

\hspace*{\fill} \\ %插入空行
\includegraphics[width=0.05\textwidth]{images/tip}
如果你不喜欢输入很长的C++类型名，比如：std::lock\underline{ }guard<std::mutex>，可以使用using关键字为类型创建简短的别名，例如，使用\texttt{locker = std::guard<std::mutex>;}。 \par
\noindent\textbf{}\ \par

现在，转到pop()函数，我们可以修改类声明，使pop()直接返回堆栈顶部的值。我们这样做的主要原因是不希望访问栈顶(使用引用)，然后从另一个线程中弹出数据。因此，我们将修改pop()函数，使其成为一个共享对象，然后返回堆栈元素: \par

\begin{lstlisting}[caption={}]
std::shared_ptr<T> pop()
{
	std::lock_guard<std::mutex> lock(mutex_);
	if (wrappee_.empty()) {
		throw std::exception("The stack is empty");
	}
	std::shared_ptr<T>
	top_element{std::make_shared<T>(std::move(wrappee_.top()))};
	wrappee_.pop();
	return top_element;
}
\end{lstlisting}

注意，safe\underline{ }stack类的声明也应该根据pop()函数的改变而改变。此外，我们不再需要top()。 \par

\noindent\textbf{}\ \par
\textbf{设计无锁的数据结构} \ \par
如果有一个线程可以保证进程进展，那么就可称其是一个无锁函数。与基于锁的函数(其中一个线程可以阻塞另一个线程，都可能在取得进展之前等待某些条件)相比，无锁状态确保有一个线程取得进展。使用数据同步原语的算法和数据结构正在阻塞，一个线程被挂起，直到另一个线程执行一个操作。这意味着在块移除(通常是为互斥锁解锁)之前，线程不能继续运行。我们的关注点在于不使用阻塞函数的数据结构和算法。我们称其中一些为无锁算法，尽管还应该区分非阻塞算法和数据结构的类型。 \par

\noindent\textbf{}\ \par
\textbf{使用原子类型} \ \par
前面，我们介绍了作为数据争用的原因的源代码行之间的间隙。当有一个包含不止一条指令的操作时，大脑就会提醒你可能出现的问题。然而，无论你如何努力使操作独立和单一，在大多数情况下，如果不将操作分解为涉及多个指令的步骤，将无法实现。C++通过提供原子类型来解决这个问题。 \par
首先，为什么使用原子这个词。一般来说，我们理解原子是指不能分解成更小部分的东西。也就是说，原子操作是一种不能做一半的操作:要么做了，要么没做。原子操作的例子可能是对整数的简单赋值: \par

\begin{lstlisting}[caption={}]
num = 37;
\end{lstlisting}

如果两个线程访问这行代码，它们都不会遇到半途而废的代码。换句话说，任务之间没有间隙。当然，如果num表示带有用户定义的赋值操作符的复杂对象，同样的语句可能会有很多间隙。 \par

\hspace*{\fill} \\ %插入空行
\includegraphics[width=0.05\textwidth]{images/warn}
原子操作是一种不可分割的操作。 \par
\noindent\textbf{}\ \par

另一方面，非原子操作可以视为完成了一半，例子是我们前面讨论的自增操作。在C++中，所有关于原子类型的操作也是原子类型。这意味着我们可以通过使用原子类型来避免行之间的间隙。在使用原子之前，我们可以通过使用互斥来创建原子操作。例如，我们可以认为下面的函数是原子的: \par

\begin{lstlisting}[caption={}]
void foo()
{
	mutex.lock();
	int a{41};
	int b{a + 1};
	mutex.unlock();
}
\end{lstlisting}

真正的原子操作和上面假原子操作之间的区别是，原子操作不需要锁。这实际上是一个很大的区别，因为互斥等同步机制合并了开销和性能损失。更准确地说，原子类型利用低级机制来确保指令的独立性和原子性。标准原子类型定义在<atomic>头文件中，标准原子类型也可以使用内部锁定。为了确保它们不使用内部锁定，标准库中的所有原子类型都is\underline{ }lock\underline{ }free()函数。 \par

\hspace*{\fill} \\ %插入空行
\includegraphics[width=0.05\textwidth]{images/warn}
唯一没有is\underline{ }lock\underline{ }free()成员函数的原子类型是std::atomic\underline{ }flag。此类型上的操作必须是无锁的。它是一个布尔标志，大多数时候它用作实现其他无锁类型的基值。 \par
\noindent\textbf{}\ \par

也就是说，如果直接使用原子指令对obj进行操作，obj.is\underline{ }lock\underline{ }free()将返回true。如果返回false，则表示使用了内部锁定。如果原子类型对于所有支持的硬件都是无锁的，则静态constexpr函数is\underline{ }always\underline{ }lock\underline{ }free()返回true。由于该函数是constexpr，允许我们定义该类型在编译时是否无锁。这是一个很大的进步，会以一种良好的方式影响代码的结构和执行。例如，std::atomic<int>::is\underline{ }always\underline{ }lock\underline{ }free()返回true，因为std::atomic<int>很可能总是无锁的。 \par

\hspace*{\fill} \\ %插入空行
\includegraphics[width=0.05\textwidth]{images/tip}
在希腊语中，a的意思是不，tomo的意思是切。原子这个词来自希腊语atomos，翻译过来就是不可切割的。也就是说，我们认为原子是不可分割的最小单位。我们使用原子类型和操作来避免指令之间的间隙。 \par
\noindent\textbf{}\ \par

我们对原子类型使用特化，例如：\texttt{std::atomic<long>;}。可以参考下表来获得更方便的原子类型名称。表的左列包含原子类型，右列包含它的特化: \par

\begin{table}[h]
	\begin{tabularx}{\textwidth}{|X|X|}
		\hline 原子类型  & 特化版 \\ 
		\hline atomic\underline{ }bool   & std::atomic<bool> \\
		\hline atomic\underline{ }char   & std::atomic<char> \\
		\hline atomic\underline{ }schar   & std::atomic<signed char> \\
		\hline atomic\underline{ }uchar   & std::atomic<unsigned char> \\ 
		\hline atomic\underline{ }int   & std::atomic<int> \\
		\hline atomic\underline{ }uint   & std::atomic<unsigned> \\
		\hline atomic\underline{ }short   & std::atomic<short> \\
		\hline atomic\underline{ }ushort   & std::atomic<unsigned short> \\ 
		\hline atomic\underline{ }long   & std::atomic<long> \\
		\hline atomic\underline{ }ulong   & std::atomic<unsigned long> \\
		\hline atomic\underline{ }llong   & std::atomic<long long> \\
		\hline atomic\underline{ }ullong   & std::atomic<unsigned long long> \\
		\hline atomic\underline{ }char16\underline{ }t   & std::atomic<char16\underline{ }t> \\
		\hline atomic\underline{ }char32\underline{ }t   & std::atomic<char32\underline{ }t> \\
		\hline atomic\underline{ }wchar\underline{ }t   & std::atomic<wchar\underline{ }t> \\
		\hline
	\end{tabularx}
\end{table}

上表表示基本原子类型。正则类型和原子类型之间的根本区别是，我们可以对它们进行操作。现在让我们更详细地讨论原子操作。 \par

\noindent\textbf{}\ \par
\textbf{操作原子的类型} \ \par
回想一下我们在前一节中讨论的间隙。原子类型的目标是消除指令之间的间隙，或者提供一些操作，这些操作负责将几个指令包装成一条指令组合在一起。以下是对原子类型的操作: \par

\begin{itemize}
	\item load()
	\item store()
	\item exchange()
	\item compare\underline{ }exchange\underline{ }weak()
	\item compare\underline{ }exchange\underline{ }strong()
	\item wait()
	\item notify\underline{ }one()
	\item notify\underline{ }all()
\end{itemize}

load()操作自动加载并返回原子变量的值。store()用提供的非原子参数替换原子变量的值。 \par
load()和store()都类似于对非原子变量的读取和分配操作。每当我们访问一个对象的值时，我们执行一个read指令。例如，下面的代码打印了double变量的内容: \par

\begin{lstlisting}[caption={}]
double d{4.2}; // "store" 4.2 into "d"
std::cout << d; // "read" the contents of "d"
\end{lstlisting}

原子类型的情况下，类似的读操作转换为: \par

\begin{lstlisting}[caption={}]
atomic_int m;
m.store(42); // atomically "store" the value
std::cout << m.load(); // atomically "read" the contents
\end{lstlisting}

虽然前面的代码没有任何意义，但包含了这个示例来表示处理原子类型时的区别。访问原子变量应该通过原子操作来完成。下面的代码表示load()、store()和exchange()函数的定义: \par

\begin{lstlisting}[caption={}]
T load(std::memory_order order = std::memory_order_seq_cst) const noexcept;
void store(T value, std::memory_order order =
	std::memory_order_seq_cst) noexcept;
T exchange(T value, std::memory_order order =
	std::memory_order_seq_cst) noexcept;
\end{lstlisting}

有一个类型为std::memory\underline{ }order的额外参数。exchange()函数由store()和load()函数组成，以原子的方式用提供的参数替换值，并原子地获取前一个值。 \par
compare\underline{ }exchange\underline{ }weak()和compare\underline{ }exchange\underline{ }strong()函数的原理类似。下面是它们的定义: \par

\begin{lstlisting}[caption={}]
bool compare_exchange_weak(T& expected_value, T target_value,
	std::memory_order order =
	std::memory_order_seq_cst) noexcept;
bool compare_exchange_strong(T& expected_value, T target_value,
	std::memory_order order =
	std::memory_order_seq_cst) noexcept;
\end{lstlisting}

比较第一个参数(expected\underline{ }value)与原子变量，如果它们相等，则用第二个参数(target\underline{ }value)替换该变量。否则，它们会自动地将值加载到第一个参数中(这就是为什么它是通过引用传递的)。弱交换和强交换之间的区别是，compare\underline{ }exchange\underline{ }weak()允许错误地失败(称为假失败)，也就是说，即使expected\underline{ }value等于基值，函数也认为它们不相等。这样做是因为在某些平台上，可以提升性能。 \par
wait()、notify\underline{ }one()和notify\underline{ }all()函数从C++20开始添加。wait()函数阻塞线程，直到修改原子对象的值。它接受一个参数来与原子对象的值进行比较。如果两个值相等，则阻塞线程。要手动解除线程阻塞，可以调用notify\underline{ }one()或notify\underline{ }all()。它们之间的区别是，notify\underline{ }one()至少解除一个被阻塞的操作的阻塞，而notify\underline{ }all()解除所有这些操作的阻塞。 \par
现在，让我们讨论一下在前面声明的原子类型成员函数时遇到的内存序。memory\underline{ }order定义了原子操作前后内存访问的顺序。当多个线程同时读写变量时，一个线程读取变量的顺序与另一个线程存储变量的顺序不同。原子操作的默认顺序是连续一致的顺序——这就是std::memory\underline{ }order\underline{ }seq\underline{ }cst的作用。有几种类型的内存序，包括memory\underline{ }order\underline{ }relax、memory\underline{ }order\underline{ }consume、memory\underline{ }order\underline{ }acquire、memory\underline{ }order\underline{ }release、memory\underline{ }order\underline{ }acq\underline{ }rel和memory\underline{ }order\underline{ }seq\underline{ }cst。下一节中，我们将设计一个使用默认内存顺序的原子类型的无锁堆栈。 \par

\noindent\textbf{}\ \par
\textbf{设计一个无锁的栈} \ \par
在设计堆栈时，要记住的关键事情是确保推送的值可以安全地从另一个线程返回。同样重要的是确保只有一个线程返回值。 \par
前面几节中，我们实现了一个包装std::stack的基于锁的堆栈。我们知道堆栈不是一个真正的数据结构，而是一个适配器。通常，在实现堆栈时，我们选择向量或链表作为其底层数据结构。让我们看一个基于链表的无锁堆栈示例。将新元素推入堆栈涉及到创建一个新的列表节点，将它的next指针设置为当前的头节点，然后将头节点设置为指向新插入的节点。 \par

\hspace*{\fill} \\ %插入空行
\includegraphics[width=0.05\textwidth]{images/warn}
如果你对“头”或“next指针”这两个术语感到困惑，请回顾第6章，我们在其中详细讨论了链表。 \par
\noindent\textbf{}\ \par

单线程上下文中，所描述的步骤是正确的。但是，如果有多个线程在修改堆栈，我们就应该担心了。让我们找出push()操作的缺陷。当一个新元素被推入堆栈时，下面是三个主要步骤: \par

\begin{enumerate}
	\item \texttt{node* new\underline{ }elem = new node(data); }
	\item \texttt{new\underline{ }elem->next = head\underline{ }; }
	\item \texttt{head\underline{ } = new\underline{ }elem; }
\end{enumerate}

第1步中，我们将新节点插入到底层链表中。第2步描述我们将它插入到列表的前面——这就是为什么新节点的next指针指向head\underline{ }。最后，由于head\underline{ }指针代表列表的起点，我们应该将其值重置为指向新添加的节点，就像第3步中那样。 \par
节点类型是我们在堆栈中用来表示列表节点的内部结构。下面是它的定义: \par

\begin{lstlisting}[caption={}]
template <typename T>
class lock_free_stack
{
	private:
	struct node {
		T data;
		node* next;
		node(const T& d) : data(d) {}
	}
	node* head_;
// the rest of the body is omitted for brevity
};
\end{lstlisting}

我们建议您做的第一件事是在当前代码中寻找问题——不是在前面的代码中，而是在我们描述的将新元素放入堆栈的步骤中。假设两个线程同时在添加节点。第2步中的一个线程将新元素的next指针设置为指向head\underline{ }。另一个线程使head\underline{ }指针指向另一个新元素。很明显，这可能会导致数据损坏。对于一个线程来说，步骤2和步骤3有相同的head\underline{ }是至关重要的。为了解决步骤2和步骤3之间的竞争条件，我们应该使用原子比较/交换操作来保证在之前读取head\underline{ }的值时head\underline{ }没有修改。由于需要原子地访问head指针，下面是如何修改lock\underline{ }free\underline{ }stack类中的head\underline{ }成员: \par

\begin{lstlisting}[caption={}]
template <typename T>
class lock_free_stack
{
private:
	// code omitted for brevity
	std::atomic<node*> head_;
	// code omitted for brevity
};
\end{lstlisting}

下面是我们如何实现原子head\underline{}指针的无锁push(): \par

\begin{lstlisting}[caption={}]
void push(const T& data)
{
	node* new_elem = new node(data);
	new_elem->next = head_.load();
	while (!head_.compare_exchange_weak(new_elem->next, new_elem));
}
\end{lstlisting}

我们使用compare\underline{ }exchange\underline{ }weak()来确保head\underline{ }指针与我们在new\underline{ }elem->next中存储的值相同。如果是，则将其设置为new\underline{ }elem。当compare\underline{ }exchange\underline{ }weak()执行成功，就可以确定节点已经成功插入到列表中。 \par
看看我们如何使用原子操作访问节点。类型为T - std::atomic<T*> -的指针的原子形式提供了相同的接口。除此之外，std::atomic<T*>提供了指向算术操作fetch\underline{ }add()和fetch\underline{ }sub()的指针。它们对存储的地址进行原子加法和减法运算。这里有一个例子: \par

\begin{lstlisting}[caption={}]
struct some_struct {};
any arr[10];
std::atomic<some_struct*> ap(arr);
some_struct* old = ap.fetch_add(2);
// now old is equal to arr
// ap.load() is equal to &arr[2]
\end{lstlisting}

我们故意将该指针命名为old，因为fetch\underline{ }add()将该数字添加到指针的地址中，并返回旧值。这就是为什么old和arr指向同一个地址。 \par
下一节中，我们将介绍有关原子类型的更多操作。现在，让我们回到我们的无锁堆栈。要pop()一个元素，即移除一个节点，我们需要读取head\underline{ }并将其设置为head\underline{ }的next元素，如下所示:\par

\begin{lstlisting}[caption={}]
void pop(T& popped_element)
{
	node* old_head = head_;
	popped_element = old_head->data;
	head_ = head_->next;
	delete old_head;
}
\end{lstlisting}

现在，假设有几个线程并发地执行它。如果两个线程从栈中移除项，读取了相同的head\underline{ }值，该怎么办?这个和其他一些竞态条件产生了下面的实现: \par

\begin{lstlisting}[caption={}]
void pop(T& popped_element)
{
	node* old_head = head_.load();
	while (!head_.compare_exchange_weak(old_head, old_head->next));
	popped_element = old_head->data;
}
\end{lstlisting}

我们在前面的代码中应用了与push()函数几乎相同的逻辑。前面的代码并不完美，应该得到加强。我们建议您努力修改它，以消除内存泄漏。 \par
我们已经看到，无锁实现严重依赖于原子类型和操作。我们在上一节中讨论的操作并不是终点。现在让我们来发现更多的原子操作。 \par

\noindent\textbf{}\ \par
\textbf{原子类型的更多操作方式} \ \par
在上一节中，我们在指向用户定义类型的指针上使用了std::atomic<>。也就是说，我们为list节点声明了以下结构: \par

\begin{lstlisting}[caption={}]
// the node struct is internal to
// the lock_free_stack class defined above
struct node
{
	T data;
	node* next;
};
\end{lstlisting}

节点结构是一个用户定义的类型。在上一节中我们实例化了std::atomic<node*>，但是我们可以用同样的方式，为几乎任何用户定义的类型实例化std::atomic<>，即std::atomic<T>。但是，应该注意到std::atomic<T>的接口仅限于以下函数: \par

\begin{itemize}
	\item load()
	\item store()
	\item exchange()
	\item compare\underline{ }exchange\underline{ }weak()
	\item compare\underline{ }exchange\underline{ }strong()
	\item wait()
	\item notify\underline{ }one()
	\item notify\underline{ }all()
\end{itemize}

现在，让我们根据底层类型的细节，来查看原子类型上可用的操作的完整列表。 \par
std::atomic<>实例化为整型(例如整数或指针)，具有以下操作以及我们前面列出的操作: \par

\begin{itemize}
	\item fetch\underline{ }add()
	\item fetch\underline{ }sub()
	\item fetch\underline{ }or()
	\item fetch\underline{ }and()
	\item fetch\underline{ }xor()
\end{itemize}

此外，除了自增(++)和自减(--)之外，还可以使用以下操作符:+=、-=、|=、\&=和\^=。 \par
最后，一个特殊的原子类型叫做atomic\underline{ }flag，它有两个可用的操作: \par

\begin{itemize}
	\item clear()
	\item test\underline{ }and\underline{ }set()
\end{itemize}

对std::atomic\underline{ }flag使用原子操作时，需要使用clear()函数清除它，而test\underline{ }and\underline{ }set()将值更改为true，并返回前一个值。 \par

\noindent\textbf{}\ \par
\textbf{总结} \ \par
应该考虑一下使用原子操作的std::atomic\underline{ }flag。本章的测试中，我们介绍了一个相当简单的堆栈例子，当然还有更复杂的例子需要研究和遵循。当我们讨论设计并发栈时，我们考虑了两个版本，其中一个表示无锁堆栈。与基于锁的解决方案相比，无锁的数据结构和算法是开发者的最终目标，因为它们提供了避免数据竞争的机制，甚至不需要同步资源。 \par
还介绍了可以在项目中使用的原子类型和操作，以确保指令是不可分割的。如果一条指令是原子的，就不需要担心它的同步。我们强烈建议读者继续研究这个主题，并构建更健壮和复杂的无锁数据结构。下一章中，我们将看到如何设计实际使用的应用程序。 \par

\noindent\textbf{}\ \par
\textbf{问题} \ \par
\begin{enumerate}
	\item 为什么要在多线程的单例实现中检查两次实例?
	\item 在实现基于锁的堆栈的复制构造函数时，我们锁定了另一个堆栈的互斥锁。为什么?
	\item 什么是原子类型和原子操作?
	\item 为什么我们对原子类型使用load()和store() ?
	\item std::atomic<T*>支持哪些操作?
\end{enumerate}

\noindent\textbf{}\ \par
\textbf{扩展阅读} \ \par
\begin{itemize}
	\item Concurrent Patterns and Best Practices by Atul Khot, at https://www.packtpub.com/application-development/concurrent-patterns-and-best-practices
	\item Mastering C++ Multithreading by Maya Posch, at  https:/​/​www.​packtpub.​com/application-​development/​mastering-​c-​multithreading
\end{itemize}

\newpage
































